package com.xzc.apitest.tabletest

import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.{DataTypes, EnvironmentSettings, Table, TableEnvironment}
import org.apache.flink.table.api.scala._
import org.apache.flink.table.descriptors.{Csv, FileSystem, Kafka, OldCsv, Schema}
import sun.plugin.cache.OldCacheEntry

object TableApiTest {
  def main(args: Array[String]): Unit = {
    //1.创建环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)

    val tableEnv = StreamTableEnvironment.create(env)

    //    //1.1 老版本planner，流处理
    //    val settings = EnvironmentSettings.newInstance()
    //      .useOldPlanner()
    //      .inStreamingMode()
    //      .build()
    //    val oldStreamTableEnv = StreamTableEnvironment.create(env, settings)
    //
    //    //1.2 老版本，批处理
    //    val batchEnv = ExecutionEnvironment.getExecutionEnvironment
    //    val oldBatchTableEnv = BatchTableEnvironment.create(batchEnv)
    //
    //    //1.3 blink planner, 流处理
    //    val blinkStreamSettings = EnvironmentSettings.newInstance()
    //      .useBlinkPlanner()
    //      .inStreamingMode()
    //      .build()
    //    val blinkStreamTableEnv = StreamTableEnvironment.create(env, blinkStreamSettings)
    //
    //    //1.4 blink planner, 批处理
    //    val blinkBatchSettings = EnvironmentSettings.newInstance()
    //      .useBlinkPlanner()
    //      .inBatchMode()
    //      .build()
    //    val blinkBatchTableEnv = TableEnvironment.create(blinkBatchSettings)

    //2 连接外部系统，读取数据，注册表
    //2.1 读取文件
    val filePath = "D:\\git\\learning_flink\\_01_试用\\src\\main\\resources\\sensor.txt"
    tableEnv.connect(new FileSystem().path(filePath))
      .withFormat(new Csv())
      .withSchema(new Schema()
        .field("id", DataTypes.STRING())
        .field("timestamp", DataTypes.BIGINT())
        .field("temperature", DataTypes.DOUBLE()))
      .createTemporaryTable("inputTable")

    //2.2 从kafka读取数据
    tableEnv.connect(new Kafka()
      .version("0.11")
      .topic("sensor")
      .property("zookeeper.connect", "hadoop102:2181")
      .property("bootstrap.servers", "hadoop102:9092")
    )
      .withFormat(new Csv())
      .withSchema(new Schema()
        .field("id", DataTypes.STRING())
        .field("timestamp", DataTypes.BIGINT())
        .field("temperature", DataTypes.DOUBLE()))
      .createTemporaryTable("kafkaInputTable")

    //3 查询转换
    //3.1 使用table api
    val sensorTable = tableEnv.from("inputTable")
    val resultTable = sensorTable
      .select('id, 'temperature)
      .filter('id === "sensor_1")
    //3.2 SQL
    val resultSqlTable = tableEnv.sqlQuery(
      """
        |select id, temperature
        |from inputTable
        |where id = 'sensor_1'
      """.stripMargin
    )

    val inputTable: Table = tableEnv.from("kafkaInputTable")
    inputTable.toAppendStream[(String, Long, Double)].print()
    resultTable.toAppendStream[(String, Double)].print("table")
    resultSqlTable.toAppendStream[(String, Double)].print("sql")

    env.execute("table api test")
  }
}
